View Javadoc

1   /*
2    * Copyright (c) 2004-2005, University Health Network.  All rights reserved. Distributed under the BSD 
3    * license (see http://opensource.org/licenses/bsd-license.php).
4    *  
5    * Created on 30-Dec-2004
6    */
7   package ca.uhn.cache.util;
8   
9   import org.apache.commons.logging.Log;
10  import org.apache.commons.logging.LogFactory;
11  
12  import EDU.oswego.cs.dl.util.concurrent.Executor;
13  import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
14  
15  import ca.uhn.cache.IQuery;
16  import ca.uhn.cache.IQueryResult;
17  import ca.uhn.cache.ISemanticCache;
18  import ca.uhn.cache.exception.CacheException;
19  
20  /***
21   * TODO: TESTS 
22   * 
23   * Ecapsulates the common caching algorithm of querying a cache, getting a 
24   * remainder, querying the original data source, and combining the results.  
25   * 
26   * Objects of this class hold the state of a single query instance.
27   * 
28   * It is left to the caller to provide results from the original data source.     
29   * 
30   * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
31   * @version $Revision: 1.1 $ updated on $Date: 2005/01/24 22:51:49 $ by $Author: bryan_tripp $
32   */
33  public class QueryProcessor {
34      
35      private static final Log ourLog = LogFactory.getLog(QueryProcessor.class);
36  
37      private final IQuery myQuery;
38      private final ISemanticCache myCache;
39      private IQuery[] myRemainderQueries;
40      private IQueryResult myCombinedResult;
41      private CacheException myException;
42      private Executor myExecutor;
43      
44      /***
45       * Constructor for STATIONARY DATA ONLY.  See IUnstationaryDataSource for discussion. 
46       * 
47       * @param theQuery the query to process
48       * @param theCache the cache against which to process it 
49       * @param theMaxGroups maximum number of remainder queries 
50       * @param theExecutor handler of concurrent tasks (defaults to ThreadedExecutor if null) 
51       * 
52       * @throws CacheException if there is a problem finding the query remainder
53       */
54      public QueryProcessor(final IQuery theQuery, final ISemanticCache theCache, 
55              int theMaxGroups, Executor theExecutor) throws CacheException {
56          myQuery = theQuery;
57          myCache = theCache;
58          myExecutor = theExecutor;
59          
60          if (myExecutor == null) {
61              myExecutor = new ThreadedExecutor();
62          }
63          
64          myRemainderQueries = theCache.remainder(theQuery, theMaxGroups);
65          init();
66      }
67          
68      /***
69       * Performs startup tasks (called by constructor).  The default is to start
70       * running the cache query.  Subclasses can over-ride.   
71       */
72      protected void init() {
73          final QueryProcessor processor = this;
74          
75          Runnable getter = new Runnable() {
76              public void run() {
77                  try {
78                      processor.setCombinedResult(myCache.get(myQuery));
79                  } catch (CacheException e) {
80                      processor.declareException("Problem getting data from cache", e);
81                  }
82              }
83          };
84          
85          thread(getter);        
86      }
87      
88      /***
89       * @return queries defining those parts of the query associated with this processor 
90       *      that are not contained in the cache
91       */
92      public IQuery[] getRemainderQueries() {
93          return myRemainderQueries;
94      }
95      
96      /***
97       * Provide an initial value for the combined result.  
98       * @param theResult initial value
99       */
100     protected void setCombinedResult(IQueryResult theResult) {
101         myCombinedResult = theResult;
102     }
103     
104     /***
105      * @param theMessage description
106      * @param theException an exception that has been encountered that may render query 
107      *      results incorrect (for example exception while querying cache or source data)
108      */
109     public void declareException(String theMessage, Exception theException) {
110         myException = new CacheException(theMessage, theException) {};
111     }
112     
113     /***
114      * @param theQuery scope of the results (from a remainder query)
115      * @param theSourceResult corresponding results 
116      */
117     public void setSourceResult(final IQuery theQuery, final IQueryResult theSourceResult) {
118         
119         final QueryProcessor processor = this;        
120         Runnable combiner = new Runnable() {
121             public void run() {
122                 try {
123                     synchronized (processor) {
124                         IQueryResult current = processor.getCombinedResult();
125                         IQueryResult combined = current.append(theSourceResult);
126                         processor.setCombinedResult(combined);
127                     }
128                 } catch (CacheException e) {
129                     //we don't declareException() here because it has already been declared  
130                     ourLog.error("Can't get cache result with which to combine source data", e);
131                 }
132             }
133         };        
134         thread(combiner);
135         
136         final ISemanticCache cache = myCache;        
137         Runnable filler = new Runnable() {
138             public void run() {
139                 try {
140                     cache.put(theQuery, theSourceResult);
141                 } catch (CacheException e) {
142                     ourLog.error("Can't populate cache with queried data", e); 
143                 }
144             }
145         };        
146         thread(filler);
147     }
148     
149     /***
150      * Blocks until a result is available or an exception is encountered.  
151      * 
152      * @return results including both cached data and data from the source (as they are added)
153      * @throws CacheException if a problem has been encountered getting data from the cache  
154      */
155     public IQueryResult getCombinedResult() throws CacheException {
156         while (myCombinedResult == null && myException == null) {
157             try {
158                 Thread.sleep(1);
159             } catch (InterruptedException e) {
160                 //OK
161             }
162         }
163         
164         if (myException != null) {
165             throw myException;
166         }
167         
168         return myCombinedResult;
169     }
170     
171     /***
172      * Runs a task in its own thread. 
173      *  
174      * @param theTask the task to run 
175      */
176     protected void thread(Runnable theTask) {
177         try {
178             myExecutor.execute(theTask);
179         } catch (InterruptedException e) {
180             ourLog.error("Task interrupted", e);
181         }
182     }
183 
184 }